/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.cloud.overseer;

import static org.apache.solr.common.cloud.ZkStateReader.LIVE_NODE_NODE_NAME;
import static org.apache.solr.common.cloud.ZkStateReader.LIVE_NODE_SOLR_VERSION;

import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.api.util.SolrVersion;
import org.apache.solr.cloud.OverseerTest;
import org.apache.solr.cloud.Stats;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.CommonTestInjection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ZLibCompressor;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@LogLevel(
    "org.apache.solr.common.cloud.ZkStateReader=DEBUG;org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG")
public class ZkStateReaderTest extends SolrTestCaseJ4 {
  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
  private static final long TIMEOUT = 30;

  private static class TestFixture implements Closeable {
    private final ZkTestServer server;
    private final SolrZkClient zkClient;
    private final ZkStateReader reader;
    private final ZkStateWriter writer;

    private TestFixture(
        ZkTestServer server, SolrZkClient zkClient, ZkStateReader reader, ZkStateWriter writer) {
      this.server = server;
      this.zkClient = zkClient;
      this.reader = reader;
      this.writer = writer;
    }

    @Override
    public void close() throws IOException {
      IOUtils.close(reader, zkClient);
      try {
        server.shutdown();
      } catch (InterruptedException e) {
        // ok. Shutting down anyway
      }
    }
  }

  private TestFixture fixture = null;

  @Override
  @Before
  public void setUp() throws Exception {
    super.setUp();
    fixture = setupTestFixture(getTestName(), -1);
  }

  @Override
  @After
  public void tearDown() throws Exception {
    if (fixture != null) {
      fixture.close();
    }
    super.tearDown();
  }

  private static TestFixture setupTestFixture(String testPrefix, int minStateByteLenForCompression)
      throws Exception {
    Path zkDir = createTempDir(testPrefix);
    ZkTestServer server = new ZkTestServer(zkDir);
    server.run();
    SolrZkClient zkClient =
        new SolrZkClient.Builder()
            .withUrl(server.getZkAddress())
            .withTimeout(OverseerTest.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)
            .build();
    ZkController.createClusterZkNodes(zkClient);

    ZkStateReader reader = new ZkStateReader(zkClient);
    reader.createClusterStateWatchersAndUpdate();

    ZkStateWriter writer =
        new ZkStateWriter(reader, new Stats(), minStateByteLenForCompression, new ZLibCompressor());

    return new TestFixture(server, zkClient, reader, writer);
  }

  public void testExternalCollectionWatchedNotWatched() throws Exception {
    ZkStateWriter writer = fixture.writer;
    ZkStateReader reader = fixture.reader;
    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);

    // create new collection
    ZkWriteCommand c1 =
        new ZkWriteCommand(
            "c1",
            DocCollection.create(
                "c1",
                new HashMap<>(),
                Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
                DocRouter.DEFAULT,
                0,
                Instant.now(),
                PerReplicaStatesOps.getZkClientPrsSupplier(
                    fixture.zkClient, DocCollection.getCollectionPath("c1"))));

    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
    writer.writePendingUpdates();
    reader.forceUpdateCollection("c1");

    assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
    reader.registerCore("c1");
    assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
    reader.unregisterCore("c1");
    assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
  }

  public void testCollectionStateWatcherCaching() throws Exception {
    ZkStateWriter writer = fixture.writer;
    ZkStateReader reader = fixture.reader;

    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);

    DocCollection state =
        DocCollection.create(
            "c1",
            new HashMap<>(),
            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            PerReplicaStatesOps.getZkClientPrsSupplier(
                fixture.zkClient, DocCollection.getCollectionPath("c1")));
    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
    writer.writePendingUpdates();
    assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
    reader.waitForState(
        "c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);

    Map<String, Object> props = new HashMap<>();
    props.put("x", "y");
    props.put(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
    state =
        DocCollection.create(
            "c1",
            new HashMap<>(),
            props,
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            PerReplicaStatesOps.getZkClientPrsSupplier(
                fixture.zkClient, DocCollection.getCollectionPath("c1")));
    wc = new ZkWriteCommand("c1", state);
    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
    writer.writePendingUpdates();

    boolean found = false;
    TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
    while (!timeOut.hasTimedOut()) {
      DocCollection c1 = reader.getClusterState().getCollection("c1");
      if ("y".equals(c1.getStr("x"))) {
        found = true;
        break;
      }
    }
    assertTrue("Could not find updated property in collection c1 even after 5 seconds", found);
  }

  public void testWatchedCollectionCreation() throws Exception {
    ZkStateWriter writer = fixture.writer;
    ZkStateReader reader = fixture.reader;

    reader.registerCore("c1");

    // Initially there should be no c1 collection.
    assertNull(reader.getClusterState().getCollectionRef("c1"));

    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
    reader.forceUpdateCollection("c1");

    // Still no c1 collection, despite a collection path.
    assertNull(reader.getClusterState().getCollectionRef("c1"));

    // create new collection
    DocCollection state =
        DocCollection.create(
            "c1",
            new HashMap<>(),
            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            PerReplicaStatesOps.getZkClientPrsSupplier(
                fixture.zkClient, DocCollection.getCollectionPath("c1")));
    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
    writer.writePendingUpdates();

    assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));

    // reader.forceUpdateCollection("c1");
    reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
    ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
    assertNotNull(ref);
    assertFalse(ref.isLazilyLoaded());

    Stat stat = new Stat();
    fixture.zkClient.getData(ZkStateReader.getCollectionPath("c1"), null, stat, false);
    assertEquals(Instant.ofEpochMilli(stat.getCtime()), ref.get().getCreationTime());
  }

  /**
   * Verifies that znode and child versions are correct and version changes trigger cluster state
   * updates
   */
  public void testNodeVersion() throws Exception {
    ZkStateWriter writer = fixture.writer;
    ZkStateReader reader = fixture.reader;

    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);

    ClusterState clusterState = reader.getClusterState();
    // create new collection
    DocCollection state =
        DocCollection.create(
            "c1",
            new HashMap<>(),
            Map.of(
                ZkStateReader.CONFIGNAME_PROP,
                ConfigSetsHandler.DEFAULT_CONFIGSET_NAME,
                DocCollection.CollectionStateProps.PER_REPLICA_STATE,
                "true"),
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            PerReplicaStatesOps.getZkClientPrsSupplier(
                fixture.zkClient, DocCollection.getCollectionPath("c1")));
    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
    writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
    clusterState = writer.writePendingUpdates();

    // have to register it here after the updates, otherwise the child node watch will not be
    // inserted
    reader.registerCore("c1");

    TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
    timeOut.waitFor(
        "Timeout on waiting for c1 to show up in cluster state",
        () -> reader.getClusterState().getCollectionOrNull("c1") != null);

    ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
    assertFalse(ref.isLazilyLoaded());
    assertEquals(0, ref.get().getZNodeVersion());
    // no more dummy node
    assertEquals(0, ref.get().getChildNodesVersion());

    DocCollection collection = ref.get();
    PerReplicaStates prs =
        PerReplicaStatesOps.fetch(
            collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates());
    PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs)
        .persist(collection.getZNode(), fixture.zkClient);
    timeOut.waitFor(
        "Timeout on waiting for c1 updated to have PRS state r1",
        () -> {
          DocCollection c = reader.getCollection("c1");
          return c.getPerReplicaStates() != null
              && c.getPerReplicaStates().get("r1") != null
              && c.getPerReplicaStates().get("r1").state == Replica.State.DOWN;
        });

    ref = reader.getClusterState().getCollectionRef("c1");
    assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
    assertEquals(1, ref.get().getChildNodesVersion()); // but child version should be 1 now

    prs = ref.get().getPerReplicaStates();
    PerReplicaStatesOps.flipState("r1", Replica.State.ACTIVE, prs)
        .persist(collection.getZNode(), fixture.zkClient);
    timeOut.waitFor(
        "Timeout on waiting for c1 updated to have PRS state r1 marked as DOWN",
        () ->
            reader.getCollection("c1").getPerReplicaStates().get("r1").state
                == Replica.State.ACTIVE);

    ref = reader.getClusterState().getCollectionRef("c1");
    assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
    // but child version should be 3 now (1 del + 1 add)
    assertEquals(3, ref.get().getChildNodesVersion());

    // now delete the collection
    wc = new ZkWriteCommand("c1", null);
    writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
    clusterState = writer.writePendingUpdates();
    timeOut.waitFor(
        "Timeout on waiting for c1 to be removed from cluster state",
        () -> reader.getClusterState().getCollectionOrNull("c1") == null);

    reader.unregisterCore("c1");
    // re-add the same collection
    wc = new ZkWriteCommand("c1", state);
    writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
    clusterState = writer.writePendingUpdates();
    // re-register, otherwise the child watch would be missing from collection deletion
    reader.registerCore("c1");

    // reader.forceUpdateCollection("c1");
    timeOut.waitFor(
        "Timeout on waiting for c1 to show up in cluster state again",
        () -> reader.getClusterState().getCollectionOrNull("c1") != null);
    ref = reader.getClusterState().getCollectionRef("c1");
    assertFalse(ref.isLazilyLoaded());
    assertEquals(0, ref.get().getZNodeVersion());
    assertEquals(0, ref.get().getChildNodesVersion()); // child node version is reset

    // re-add PRS
    collection = ref.get();
    prs =
        PerReplicaStatesOps.fetch(
            collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates());
    PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs)
        .persist(collection.getZNode(), fixture.zkClient);
    timeOut.waitFor(
        "Timeout on waiting for c1 updated to have PRS state r1",
        () -> {
          DocCollection c = reader.getCollection("c1");
          return c.getPerReplicaStates() != null
              && c.getPerReplicaStates().get("r1") != null
              && c.getPerReplicaStates().get("r1").state == Replica.State.DOWN;
        });

    ref = reader.getClusterState().getCollectionRef("c1");

    // child version should be reset since the state.json node was deleted and re-created
    assertEquals(1, ref.get().getChildNodesVersion());
  }

  public void testForciblyRefreshAllClusterState() throws Exception {
    ZkStateWriter writer = fixture.writer;
    ZkStateReader reader = fixture.reader;

    reader.registerCore("c1"); // watching c1, so it should get non lazy reference
    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);

    reader.forciblyRefreshAllClusterStateSlow();
    // Initially there should be no c1 collection.
    assertNull(reader.getClusterState().getCollectionRef("c1"));

    // create new collection
    DocCollection state =
        DocCollection.create(
            "c1",
            new HashMap<>(),
            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            PerReplicaStatesOps.getZkClientPrsSupplier(
                fixture.zkClient, DocCollection.getCollectionPath("c1")));
    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
    writer.writePendingUpdates();

    assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));

    reader.forciblyRefreshAllClusterStateSlow();
    ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
    assertNotNull(ref);
    assertFalse(ref.isLazilyLoaded());
    assertEquals(0, ref.get().getZNodeVersion());

    // update the collection
    state =
        DocCollection.create(
            "c1",
            new HashMap<>(),
            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
            DocRouter.DEFAULT,
            ref.get().getZNodeVersion(),
            Instant.now(),
            PerReplicaStatesOps.getZkClientPrsSupplier(
                fixture.zkClient, DocCollection.getCollectionPath("c1")));
    wc = new ZkWriteCommand("c1", state);
    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
    writer.writePendingUpdates();

    reader.forciblyRefreshAllClusterStateSlow();
    ref = reader.getClusterState().getCollectionRef("c1");
    assertNotNull(ref);
    assertFalse(ref.isLazilyLoaded());
    assertEquals(1, ref.get().getZNodeVersion());

    // delete the collection c1, add a collection c2 that is NOT watched
    ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);

    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
    state =
        DocCollection.create(
            "c2",
            new HashMap<>(),
            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            PerReplicaStatesOps.getZkClientPrsSupplier(
                fixture.zkClient, DocCollection.getCollectionPath("c2")));
    ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);

    writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
    writer.writePendingUpdates();

    reader.forciblyRefreshAllClusterStateSlow();
    ref = reader.getClusterState().getCollectionRef("c1");
    assertNull(ref);

    ref = reader.getClusterState().getCollectionRef("c2");
    assertNotNull(ref);
    assertTrue(
        "c2 should have been lazily loaded but is not!",
        ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
    assertEquals(0, ref.get().getZNodeVersion());
  }

  public void testForciblyRefreshAllClusterStateCompressed() throws Exception {
    fixture.close();
    fixture = setupTestFixture(getTestName(), 0);
    ZkStateWriter writer = fixture.writer;
    ZkStateReader reader = fixture.reader;

    reader.registerCore("c1"); // watching c1, so it should get non lazy reference
    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);

    reader.forciblyRefreshAllClusterStateSlow();
    // Initially there should be no c1 collection.
    assertNull(reader.getClusterState().getCollectionRef("c1"));

    // create new collection
    DocCollection state =
        DocCollection.create(
            "c1",
            new HashMap<>(),
            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            null);
    ZkWriteCommand wc = new ZkWriteCommand("c1", state);
    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
    writer.writePendingUpdates();

    assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));

    reader.forciblyRefreshAllClusterStateSlow();
    ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
    assertNotNull(ref);
    assertFalse(ref.isLazilyLoaded());
    assertEquals(0, ref.get().getZNodeVersion());

    // update the collection
    state =
        DocCollection.create(
            "c1",
            new HashMap<>(),
            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
            DocRouter.DEFAULT,
            ref.get().getZNodeVersion(),
            Instant.now(),
            null);
    wc = new ZkWriteCommand("c1", state);
    writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
    writer.writePendingUpdates();

    reader.forciblyRefreshAllClusterStateSlow();
    ref = reader.getClusterState().getCollectionRef("c1");
    assertNotNull(ref);
    assertFalse(ref.isLazilyLoaded());
    assertEquals(1, ref.get().getZNodeVersion());

    // delete the collection c1, add a collection c2 that is NOT watched
    ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);

    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
    state =
        DocCollection.create(
            "c2",
            new HashMap<>(),
            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            null);
    ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);

    writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
    writer.writePendingUpdates();

    reader.forciblyRefreshAllClusterStateSlow();
    ref = reader.getClusterState().getCollectionRef("c1");
    assertNull(ref);

    ref = reader.getClusterState().getCollectionRef("c2");
    assertNotNull(ref);
    assertTrue(
        "c2 should have been lazily loaded but is not!",
        ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
    assertEquals(0, ref.get().getZNodeVersion());
  }

  public void testGetCurrentCollections() throws Exception {
    ZkStateWriter writer = fixture.writer;
    ZkStateReader reader = fixture.reader;

    reader.registerCore("c1"); // listen to c1. not yet exist
    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
    reader.forceUpdateCollection("c1");
    Set<String> currentCollections = reader.getCurrentCollections();
    assertEquals(0, currentCollections.size()); // no active collections yet

    // now create both c1 (watched) and c2 (not watched)
    DocCollection state1 =
        DocCollection.create(
            "c1",
            new HashMap<>(),
            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            PerReplicaStatesOps.getZkClientPrsSupplier(
                fixture.zkClient, DocCollection.getCollectionPath("c1")));
    ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
    DocCollection state2 =
        DocCollection.create(
            "c2",
            new HashMap<>(),
            Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            PerReplicaStatesOps.getZkClientPrsSupplier(
                fixture.zkClient, DocCollection.getCollectionPath("c1")));

    // do not listen to c2
    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
    ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);

    writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
    writer.writePendingUpdates();

    reader.forceUpdateCollection("c1");
    reader.forceUpdateCollection("c2");

    // should detect both collections (c1 watched, c2 lazy loaded)
    currentCollections = reader.getCurrentCollections();
    assertEquals(2, currentCollections.size());
  }

  /**
   * Simulates race condition that might arise when state updates triggered by watch notification
   * contend with removal of collection watches.
   *
   * <p>Such race condition should no longer exist with the new code that uses a single map for both
   * "collection watches" and "latest state of watched collection"
   */
  public void testWatchRaceCondition() throws Exception {
    ExecutorService executorService =
        ExecutorUtil.newMDCAwareSingleThreadExecutor(
            new SolrNamedThreadFactory("zkStateReaderTest"));
    CommonTestInjection.setDelay(1000);
    final AtomicBoolean stopMutatingThread = new AtomicBoolean(false);
    try {
      ZkStateWriter writer = fixture.writer;
      final ZkStateReader reader = fixture.reader;
      fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);

      // start another thread to constantly updating the state
      final AtomicReference<Exception> updateException = new AtomicReference<>();
      executorService.execute(
          () -> {
            try {
              ClusterState clusterState = reader.getClusterState();
              while (!stopMutatingThread.get()) {
                DocCollection collection = clusterState.getCollectionOrNull("c1");
                int currentVersion = collection != null ? collection.getZNodeVersion() : 0;
                // create new collection
                DocCollection state =
                    DocCollection.create(
                        "c1",
                        new HashMap<>(),
                        Map.of(
                            ZkStateReader.CONFIGNAME_PROP,
                            ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
                        DocRouter.DEFAULT,
                        currentVersion,
                        Instant.now(),
                        PerReplicaStatesOps.getZkClientPrsSupplier(
                            fixture.zkClient, DocCollection.getCollectionPath("c1")));
                ZkWriteCommand wc = new ZkWriteCommand("c1", state);
                writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
                clusterState = writer.writePendingUpdates();
                TimeUnit.MILLISECONDS.sleep(100);
              }
            } catch (Exception e) {
              updateException.set(e);
            }
          });
      executorService.shutdown();

      reader.waitForState(
          "c1",
          10,
          TimeUnit.SECONDS,
          slices -> slices != null); // wait for the state to become available

      final CountDownLatch latch = new CountDownLatch(2);

      // remove itself on 2nd trigger
      DocCollectionWatcher dummyWatcher =
          collection -> {
            latch.countDown();
            return latch.getCount() == 0;
          };
      reader.registerDocCollectionWatcher("c1", dummyWatcher);
      assertTrue(
          "Missing expected collection updates after the wait", latch.await(10, TimeUnit.SECONDS));
      reader.removeDocCollectionWatcher("c1", dummyWatcher);

      // cluster state might not be updated right the way from the removeDocCollectionWatcher call
      // above as org.apache.solr.common.cloud.ZkStateReader.Notification might remove the watcher
      // as well and might still be in the middle of updating the cluster state.
      TimeOut timeOut = new TimeOut(2000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
      timeOut.waitFor(
          "The ref is not lazily loaded after waiting",
          () -> reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());

      if (updateException.get() != null) {
        throw (updateException.get());
      }
    } finally {
      stopMutatingThread.set(true);
      CommonTestInjection.reset();
      ExecutorUtil.awaitTermination(executorService);
    }
  }

  /**
   * Simulates race condition that can arise from the normal way in which the removal of collection
   * StateWatchers is deferred.
   *
   * <p>StateWatchers are registered at the level of Zk code, so when StateWatchers are removed in
   * Solr code, the actual removal is deferred until the next callback for the associated collection
   * fires, at which point the removed watcher should allow itself to expire. If a watcher is
   * re-added for the associated collection in the intervening time, only the most recently added
   * watcher should re-register; the removed watcher should simply expire.
   *
   * <p>Duplicate/redundant StateWatchers should no longer be registered with the new code that
   * tracks the currently registered singleton-per-collection watcher in Solr code, and only
   * re-registers the currently active watcher, with all other watchers allowing themselves to
   * expire.
   */
  public void testStateWatcherRaceCondition() throws Exception {
    ZkStateWriter writer = fixture.writer;
    final ZkStateReader reader = fixture.reader;
    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
    int extraWatchers = 10;
    int iterations = 10;
    for (int i = 0; i < extraWatchers; i++) {
      // add and remove a bunch of watchers
      DocCollectionWatcher w = (coll) -> false;
      try {
        reader.registerDocCollectionWatcher("c1", w);
      } finally {
        reader.removeDocCollectionWatcher("c1", w);
      }
    }
    final ConcurrentHashMap<Integer, LongAdder> invoked = new ConcurrentHashMap<>();
    CyclicBarrier barrier = new CyclicBarrier(2);
    reader.registerDocCollectionWatcher(
        "c1",
        (coll) -> {
          // add a watcher that tracks how many times it's invoked per znode version
          if (coll != null) {
            invoked.computeIfAbsent(coll.getZNodeVersion(), (k) -> new LongAdder()).increment();
            try {
              barrier.await(250, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
              throw new RuntimeException(e);
            }
          }
          return false;
        });

    ClusterState clusterState = reader.getClusterState();
    int dataVersion = -1;
    for (int i = 0; i < iterations; i++) {
      // create or update collection
      DocCollection state =
          DocCollection.create(
              "c1",
              new HashMap<>(),
              Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
              DocRouter.DEFAULT,
              dataVersion,
              Instant.now(),
              PerReplicaStatesOps.getZkClientPrsSupplier(
                  fixture.zkClient, DocCollection.getCollectionPath("c1")));
      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
      writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
      clusterState = writer.writePendingUpdates();
      barrier.await(250, TimeUnit.MILLISECONDS); // wait for the watch callback to execute
      fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1" + i, true);
      dataVersion = clusterState.getCollectionOrNull("c1").getZNodeVersion();
    }
    // expect to have been invoked for each iteration ...
    assertEquals(iterations, invoked.size());
    // ... and only _once_ for each iteration
    assertTrue(
        "wrong number of watchers (expected 1): " + invoked,
        invoked.values().stream().mapToLong(LongAdder::sum).allMatch((l) -> l == 1));
  }

  /**
   * Ensure that collection state fetching (getCollectionLive etc.) would not throw exception when
   * the state.json is deleted in between the state.json read and PRS entries read
   */
  public void testDeletePrsCollection() throws Exception {
    ZkStateWriter writer = fixture.writer;
    ZkStateReader reader = fixture.reader;

    String collectionName = "c1";
    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName, true);

    ClusterState clusterState = reader.getClusterState();

    String nodeName = "node1:10000_solr";
    String sliceName = "shard1";
    Slice slice = new Slice(sliceName, Map.of(), Map.of(), collectionName);

    // create new collection
    DocCollection state =
        DocCollection.create(
            collectionName,
            Map.of(sliceName, slice),
            Collections.singletonMap(DocCollection.CollectionStateProps.PER_REPLICA_STATE, true),
            DocRouter.DEFAULT,
            0,
            Instant.now(),
            PerReplicaStatesOps.getZkClientPrsSupplier(
                fixture.zkClient, DocCollection.getCollectionPath(collectionName)));
    ZkWriteCommand wc = new ZkWriteCommand(collectionName, state);
    writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
    clusterState = writer.writePendingUpdates();

    TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
    timeOut.waitFor(
        "Timeout on waiting for c1 to show up in cluster state",
        () -> reader.getClusterState().getCollectionOrNull(collectionName) != null);

    String collectionPath = ZkStateReader.getCollectionPath(collectionName);

    // now create the replica, take note that this has to be done after DocCollection creation with
    // empty slice, otherwise the DocCollection ctor would fetch the PRS entries and throw
    // exceptions
    String replicaBaseUrl = Utils.getBaseUrlForNodeName(nodeName, "http");

    String replicaName = "replica1";
    Replica replica =
        new Replica(
            replicaName,
            Map.of(
                ZkStateReader.CORE_NAME_PROP,
                "core1",
                ZkStateReader.STATE_PROP,
                Replica.State.ACTIVE.toString(),
                ZkStateReader.NODE_NAME_PROP,
                nodeName,
                ZkStateReader.BASE_URL_PROP,
                replicaBaseUrl,
                ZkStateReader.REPLICA_TYPE,
                Replica.Type.NRT.name()),
            collectionName,
            sliceName);

    wc =
        new ZkWriteCommand(
            collectionName, SliceMutator.updateReplica(state, slice, replica.getName(), replica));
    writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
    clusterState = writer.writePendingUpdates();

    timeOut.waitFor(
        "Timeout on waiting for replica to show up in cluster state",
        () ->
            reader.getCollectionLive(collectionName).getSlice(sliceName).getReplica(replicaName)
                != null);

    try (CommonTestInjection.BreakpointSetter breakpointSetter =
        new CommonTestInjection.BreakpointSetter()) {
      // set breakpoint such that after state.json fetch and before PRS entry fetch, we can delete
      // the state.json and PRS entries to trigger the race condition
      breakpointSetter.setImplementation(
          PerReplicaStatesOps.class.getName() + "/beforePrsFetch",
          (args) -> {
            try {
              // this is invoked after ZkStateReader.fetchCollectionState has fetched the state.json
              // but before PRS entries.
              // call delete state.json on ZK directly, very tricky to control execution order with
              // writer.enqueueUpdate
              reader.getZkClient().clean(collectionPath);
            } catch (InterruptedException e) {
              throw new RuntimeException(e);
            } catch (KeeperException e) {
              throw new RuntimeException(e);
            }
          });

      // set breakpoint to verify the expected PrsZkNodeNotFoundException is indeed thrown within
      // the execution flow, such exception is caught within the logic and not thrown to the
      // caller
      AtomicBoolean prsZkNodeNotFoundExceptionThrown = new AtomicBoolean(false);
      breakpointSetter.setImplementation(
          ZkStateReader.class.getName() + "/exercised",
          (args) -> {
            if (args[0] instanceof PerReplicaStatesOps.PrsZkNodeNotFoundException) {
              prsZkNodeNotFoundExceptionThrown.set(true);
            }
          });

      timeOut.waitFor(
          "Timeout waiting for collection state to become null",
          () -> {
            // this should not throw exception even if the PRS entry read is delayed artificially
            // (by previous command) and deleted after the following getCollectionLive call
            return reader.getCollectionLive(collectionName) == null;
          });

      assertTrue(prsZkNodeNotFoundExceptionThrown.get());
    }
  }

  /** Test when two live nodes have valid SemVer strings */
  public void testFetchLowestSolrVersion_validNodes() throws Exception {
    SolrZkClient zkClient = fixture.zkClient;
    ZkStateReader reader = fixture.reader;
    String livePath = ZkStateReader.LIVE_NODES_ZKNODE;

    // Clear any existing live node children.
    List<String> nodes = zkClient.getChildren(livePath, null, true);
    for (String node : nodes) {
      zkClient.delete(livePath + "/" + node, -1, true);
    }

    // Create two live nodes with valid SemVer strings.
    String node1 = "node1_solr";
    Map<String, Object> props1 = new HashMap<>();
    props1.put(LIVE_NODE_SOLR_VERSION, "9.1.2");
    props1.put(LIVE_NODE_NODE_NAME, node1);
    byte[] data1 = Utils.toJSON(props1);
    zkClient.create(livePath + "/" + node1, data1, CreateMode.EPHEMERAL, true);

    String node2 = "node2_solr";
    Map<String, Object> props2 = new HashMap<>();
    props2.put(LIVE_NODE_SOLR_VERSION, "9.0.1");
    props2.put(LIVE_NODE_NODE_NAME, node2);
    byte[] data2 = Utils.toJSON(props2);
    zkClient.create(livePath + "/" + node2, data2, CreateMode.EPHEMERAL, true);

    var lowestVersion = reader.fetchLowestSolrVersion();
    assertTrue("Expected lowest version to be present", lowestVersion.isPresent());
    assertEquals(
        "Expected lowest version to be 9.0.1", SolrVersion.valueOf("9.0.1"), lowestVersion.get());
  }

  /** Test when the only live node has empty data. */
  public void testFetchLowestSolrVersion_noData() throws Exception {
    SolrZkClient zkClient = fixture.zkClient;
    ZkStateReader reader = fixture.reader;
    String livePath = ZkStateReader.LIVE_NODES_ZKNODE;

    // Clear any existing live node children.
    List<String> nodes = zkClient.getChildren(livePath, null, true);
    for (String node : nodes) {
      zkClient.delete(livePath + "/" + node, -1, true);
    }

    // Create a live node with empty data.
    String emptyNode = "empty_node";
    zkClient.create(livePath + "/" + emptyNode, new byte[0], CreateMode.EPHEMERAL, true);

    var lowestVersion = reader.fetchLowestSolrVersion();
    assertTrue("Expected lowest version to be present for empty node", lowestVersion.isPresent());
    assertEquals("after empty node", SolrVersion.valueOf("9.9.0"), lowestVersion.get());
  }

  /** Test when two live nodes exist; one is blank and the other has a high version */
  public void testFetchLowestSolrVersion_blankAndHighVersion() throws Exception {
    SolrZkClient zkClient = fixture.zkClient;
    ZkStateReader reader = fixture.reader;
    String livePath = ZkStateReader.LIVE_NODES_ZKNODE;

    // Clear any existing live node children.
    List<String> nodes = zkClient.getChildren(livePath, null, true);
    for (String node : nodes) {
      zkClient.delete(livePath + "/" + node, -1, true);
    }

    String node1 = "node1_solr";
    zkClient.create(
        livePath + "/" + node1,
        Utils.toJSON(Map.<String, Object>of(LIVE_NODE_SOLR_VERSION, "888.0.0")),
        CreateMode.EPHEMERAL,
        true);

    var lowestVersion1 = reader.fetchLowestSolrVersion();
    assertTrue(
        "Expected lowest version to be present for high version node", lowestVersion1.isPresent());
    assertEquals("after high node", SolrVersion.valueOf("888.0.0"), lowestVersion1.get());

    String node2 = "node2_solr";
    zkClient.create(livePath + "/" + node2, new byte[0], CreateMode.EPHEMERAL, true);

    var lowestVersion2 = reader.fetchLowestSolrVersion();
    assertTrue("Expected lowest version to be present for empty node", lowestVersion2.isPresent());
    assertEquals("after empty node", SolrVersion.valueOf("9.9.0"), lowestVersion2.get());
  }

  /** Test when no live nodes exist - should return empty Optional */
  public void testFetchLowestSolrVersion_noLiveNodes() throws Exception {
    SolrZkClient zkClient = fixture.zkClient;
    ZkStateReader reader = fixture.reader;
    String livePath = ZkStateReader.LIVE_NODES_ZKNODE;

    // Clear any existing live node children.
    List<String> nodes = zkClient.getChildren(livePath, null, true);
    for (String node : nodes) {
      zkClient.delete(livePath + "/" + node, -1, true);
    }

    var lowestVersion = reader.fetchLowestSolrVersion();
    assertFalse("Expected no lowest version when no live nodes exist", lowestVersion.isPresent());
  }
}
